sqlstore.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370
  1. package sqlstore
  2. import (
  3. "context"
  4. "fmt"
  5. "net/url"
  6. "os"
  7. "path"
  8. "path/filepath"
  9. "strings"
  10. "testing"
  11. "time"
  12. "github.com/go-sql-driver/mysql"
  13. "github.com/go-xorm/xorm"
  14. "github.com/grafana/grafana/pkg/bus"
  15. "github.com/grafana/grafana/pkg/infra/log"
  16. m "github.com/grafana/grafana/pkg/models"
  17. "github.com/grafana/grafana/pkg/registry"
  18. "github.com/grafana/grafana/pkg/services/annotations"
  19. "github.com/grafana/grafana/pkg/services/cache"
  20. "github.com/grafana/grafana/pkg/services/sqlstore/migrations"
  21. "github.com/grafana/grafana/pkg/services/sqlstore/migrator"
  22. "github.com/grafana/grafana/pkg/services/sqlstore/sqlutil"
  23. "github.com/grafana/grafana/pkg/setting"
  24. _ "github.com/grafana/grafana/pkg/tsdb/mssql"
  25. "github.com/grafana/grafana/pkg/util"
  26. _ "github.com/lib/pq"
  27. )
  28. var (
  29. x *xorm.Engine
  30. dialect migrator.Dialect
  31. sqlog log.Logger = log.New("sqlstore")
  32. )
  33. const ContextSessionName = "db-session"
  34. func init() {
  35. registry.Register(&registry.Descriptor{
  36. Name: "SqlStore",
  37. Instance: &SqlStore{},
  38. InitPriority: registry.High,
  39. })
  40. }
  41. type SqlStore struct {
  42. Cfg *setting.Cfg `inject:""`
  43. Bus bus.Bus `inject:""`
  44. CacheService *cache.CacheService `inject:""`
  45. dbCfg DatabaseConfig
  46. engine *xorm.Engine
  47. log log.Logger
  48. Dialect migrator.Dialect
  49. skipEnsureAdmin bool
  50. }
  51. func (ss *SqlStore) Init() error {
  52. ss.log = log.New("sqlstore")
  53. ss.readConfig()
  54. engine, err := ss.getEngine()
  55. if err != nil {
  56. return fmt.Errorf("Fail to connect to database: %v", err)
  57. }
  58. ss.engine = engine
  59. ss.Dialect = migrator.NewDialect(ss.engine)
  60. // temporarily still set global var
  61. x = engine
  62. dialect = ss.Dialect
  63. migrator := migrator.NewMigrator(x)
  64. migrations.AddMigrations(migrator)
  65. for _, descriptor := range registry.GetServices() {
  66. sc, ok := descriptor.Instance.(registry.DatabaseMigrator)
  67. if ok {
  68. sc.AddMigration(migrator)
  69. }
  70. }
  71. if err := migrator.Start(); err != nil {
  72. return fmt.Errorf("Migration failed err: %v", err)
  73. }
  74. // Init repo instances
  75. annotations.SetRepository(&SqlAnnotationRepo{})
  76. ss.Bus.SetTransactionManager(ss)
  77. // Register handlers
  78. ss.addUserQueryAndCommandHandlers()
  79. // ensure admin user
  80. if ss.skipEnsureAdmin {
  81. return nil
  82. }
  83. return ss.ensureAdminUser()
  84. }
  85. func (ss *SqlStore) ensureAdminUser() error {
  86. systemUserCountQuery := m.GetSystemUserCountStatsQuery{}
  87. err := ss.InTransaction(context.Background(), func(ctx context.Context) error {
  88. err := bus.DispatchCtx(ctx, &systemUserCountQuery)
  89. if err != nil {
  90. return fmt.Errorf("Could not determine if admin user exists: %v", err)
  91. }
  92. if systemUserCountQuery.Result.Count > 0 {
  93. return nil
  94. }
  95. cmd := m.CreateUserCommand{}
  96. cmd.Login = setting.AdminUser
  97. cmd.Email = setting.AdminUser + "@localhost"
  98. cmd.Password = setting.AdminPassword
  99. cmd.IsAdmin = true
  100. if err := bus.DispatchCtx(ctx, &cmd); err != nil {
  101. return fmt.Errorf("Failed to create admin user: %v", err)
  102. }
  103. ss.log.Info("Created default admin", "user", setting.AdminUser)
  104. return nil
  105. })
  106. return err
  107. }
  108. func (ss *SqlStore) buildExtraConnectionString(sep rune) string {
  109. if ss.dbCfg.UrlQueryParams == nil {
  110. return ""
  111. }
  112. var sb strings.Builder
  113. for key, values := range ss.dbCfg.UrlQueryParams {
  114. for _, value := range values {
  115. sb.WriteRune(sep)
  116. sb.WriteString(key)
  117. sb.WriteRune('=')
  118. sb.WriteString(value)
  119. }
  120. }
  121. return sb.String()
  122. }
  123. func (ss *SqlStore) buildConnectionString() (string, error) {
  124. cnnstr := ss.dbCfg.ConnectionString
  125. // special case used by integration tests
  126. if cnnstr != "" {
  127. return cnnstr, nil
  128. }
  129. switch ss.dbCfg.Type {
  130. case migrator.MYSQL:
  131. protocol := "tcp"
  132. if strings.HasPrefix(ss.dbCfg.Host, "/") {
  133. protocol = "unix"
  134. }
  135. cnnstr = fmt.Sprintf("%s:%s@%s(%s)/%s?collation=utf8mb4_unicode_ci&allowNativePasswords=true",
  136. ss.dbCfg.User, ss.dbCfg.Pwd, protocol, ss.dbCfg.Host, ss.dbCfg.Name)
  137. if ss.dbCfg.SslMode == "true" || ss.dbCfg.SslMode == "skip-verify" {
  138. tlsCert, err := makeCert(ss.dbCfg)
  139. if err != nil {
  140. return "", err
  141. }
  142. mysql.RegisterTLSConfig("custom", tlsCert)
  143. cnnstr += "&tls=custom"
  144. }
  145. cnnstr += ss.buildExtraConnectionString('&')
  146. case migrator.POSTGRES:
  147. host, port := util.SplitHostPortDefault(ss.dbCfg.Host, "127.0.0.1", "5432")
  148. if ss.dbCfg.Pwd == "" {
  149. ss.dbCfg.Pwd = "''"
  150. }
  151. if ss.dbCfg.User == "" {
  152. ss.dbCfg.User = "''"
  153. }
  154. cnnstr = fmt.Sprintf("user=%s password=%s host=%s port=%s dbname=%s sslmode=%s sslcert=%s sslkey=%s sslrootcert=%s", ss.dbCfg.User, ss.dbCfg.Pwd, host, port, ss.dbCfg.Name, ss.dbCfg.SslMode, ss.dbCfg.ClientCertPath, ss.dbCfg.ClientKeyPath, ss.dbCfg.CaCertPath)
  155. cnnstr += ss.buildExtraConnectionString(' ')
  156. case migrator.SQLITE:
  157. // special case for tests
  158. if !filepath.IsAbs(ss.dbCfg.Path) {
  159. ss.dbCfg.Path = filepath.Join(ss.Cfg.DataPath, ss.dbCfg.Path)
  160. }
  161. os.MkdirAll(path.Dir(ss.dbCfg.Path), os.ModePerm)
  162. cnnstr = fmt.Sprintf("file:%s?cache=%s&mode=rwc", ss.dbCfg.Path, ss.dbCfg.CacheMode)
  163. cnnstr += ss.buildExtraConnectionString('&')
  164. default:
  165. return "", fmt.Errorf("Unknown database type: %s", ss.dbCfg.Type)
  166. }
  167. return cnnstr, nil
  168. }
  169. func (ss *SqlStore) getEngine() (*xorm.Engine, error) {
  170. connectionString, err := ss.buildConnectionString()
  171. if err != nil {
  172. return nil, err
  173. }
  174. sqlog.Info("Connecting to DB", "dbtype", ss.dbCfg.Type)
  175. engine, err := xorm.NewEngine(ss.dbCfg.Type, connectionString)
  176. if err != nil {
  177. return nil, err
  178. }
  179. engine.SetMaxOpenConns(ss.dbCfg.MaxOpenConn)
  180. engine.SetMaxIdleConns(ss.dbCfg.MaxIdleConn)
  181. engine.SetConnMaxLifetime(time.Second * time.Duration(ss.dbCfg.ConnMaxLifetime))
  182. // configure sql logging
  183. debugSql := ss.Cfg.Raw.Section("database").Key("log_queries").MustBool(false)
  184. if !debugSql {
  185. engine.SetLogger(&xorm.DiscardLogger{})
  186. } else {
  187. engine.SetLogger(NewXormLogger(log.LvlInfo, log.New("sqlstore.xorm")))
  188. engine.ShowSQL(true)
  189. engine.ShowExecTime(true)
  190. }
  191. return engine, nil
  192. }
  193. func (ss *SqlStore) readConfig() {
  194. sec := ss.Cfg.Raw.Section("database")
  195. cfgURL := sec.Key("url").String()
  196. if len(cfgURL) != 0 {
  197. dbURL, _ := url.Parse(cfgURL)
  198. ss.dbCfg.Type = dbURL.Scheme
  199. ss.dbCfg.Host = dbURL.Host
  200. pathSplit := strings.Split(dbURL.Path, "/")
  201. if len(pathSplit) > 1 {
  202. ss.dbCfg.Name = pathSplit[1]
  203. }
  204. userInfo := dbURL.User
  205. if userInfo != nil {
  206. ss.dbCfg.User = userInfo.Username()
  207. ss.dbCfg.Pwd, _ = userInfo.Password()
  208. }
  209. ss.dbCfg.UrlQueryParams = dbURL.Query()
  210. } else {
  211. ss.dbCfg.Type = sec.Key("type").String()
  212. ss.dbCfg.Host = sec.Key("host").String()
  213. ss.dbCfg.Name = sec.Key("name").String()
  214. ss.dbCfg.User = sec.Key("user").String()
  215. ss.dbCfg.ConnectionString = sec.Key("connection_string").String()
  216. ss.dbCfg.Pwd = sec.Key("password").String()
  217. }
  218. ss.dbCfg.MaxOpenConn = sec.Key("max_open_conn").MustInt(0)
  219. ss.dbCfg.MaxIdleConn = sec.Key("max_idle_conn").MustInt(2)
  220. ss.dbCfg.ConnMaxLifetime = sec.Key("conn_max_lifetime").MustInt(14400)
  221. ss.dbCfg.SslMode = sec.Key("ssl_mode").String()
  222. ss.dbCfg.CaCertPath = sec.Key("ca_cert_path").String()
  223. ss.dbCfg.ClientKeyPath = sec.Key("client_key_path").String()
  224. ss.dbCfg.ClientCertPath = sec.Key("client_cert_path").String()
  225. ss.dbCfg.ServerCertName = sec.Key("server_cert_name").String()
  226. ss.dbCfg.Path = sec.Key("path").MustString("data/grafana.db")
  227. ss.dbCfg.CacheMode = sec.Key("cache_mode").MustString("private")
  228. }
  229. func InitTestDB(t *testing.T) *SqlStore {
  230. t.Helper()
  231. sqlstore := &SqlStore{}
  232. sqlstore.skipEnsureAdmin = true
  233. sqlstore.Bus = bus.New()
  234. sqlstore.CacheService = cache.New(5*time.Minute, 10*time.Minute)
  235. dbType := migrator.SQLITE
  236. // environment variable present for test db?
  237. if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present {
  238. dbType = db
  239. }
  240. // set test db config
  241. sqlstore.Cfg = setting.NewCfg()
  242. sec, _ := sqlstore.Cfg.Raw.NewSection("database")
  243. sec.NewKey("type", dbType)
  244. switch dbType {
  245. case "mysql":
  246. sec.NewKey("connection_string", sqlutil.TestDB_Mysql.ConnStr)
  247. case "postgres":
  248. sec.NewKey("connection_string", sqlutil.TestDB_Postgres.ConnStr)
  249. default:
  250. sec.NewKey("connection_string", sqlutil.TestDB_Sqlite3.ConnStr)
  251. }
  252. // need to get engine to clean db before we init
  253. engine, err := xorm.NewEngine(dbType, sec.Key("connection_string").String())
  254. if err != nil {
  255. t.Fatalf("Failed to init test database: %v", err)
  256. }
  257. sqlstore.Dialect = migrator.NewDialect(engine)
  258. // temp global var until we get rid of global vars
  259. dialect = sqlstore.Dialect
  260. if err := dialect.CleanDB(); err != nil {
  261. t.Fatalf("Failed to clean test db %v", err)
  262. }
  263. if err := sqlstore.Init(); err != nil {
  264. t.Fatalf("Failed to init test database: %v", err)
  265. }
  266. sqlstore.engine.DatabaseTZ = time.UTC
  267. sqlstore.engine.TZLocation = time.UTC
  268. return sqlstore
  269. }
  270. func IsTestDbMySql() bool {
  271. if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present {
  272. return db == migrator.MYSQL
  273. }
  274. return false
  275. }
  276. func IsTestDbPostgres() bool {
  277. if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present {
  278. return db == migrator.POSTGRES
  279. }
  280. return false
  281. }
  282. type DatabaseConfig struct {
  283. Type string
  284. Host string
  285. Name string
  286. User string
  287. Pwd string
  288. Path string
  289. SslMode string
  290. CaCertPath string
  291. ClientKeyPath string
  292. ClientCertPath string
  293. ServerCertName string
  294. ConnectionString string
  295. MaxOpenConn int
  296. MaxIdleConn int
  297. ConnMaxLifetime int
  298. CacheMode string
  299. UrlQueryParams map[string][]string
  300. }